查看原文
其他

独家|​58大数据平台任务调度系统的设计与实践

姚连斌 58技术 2022-03-15


导语

58DP作为58大数据平台任务调度系统,是一个通用的大数据分布式调度平台,与58大数据生态深度融合,提供一站式大数据开发与调度服务,是58数据中台战略的具体实践成果之一;目前服务于集团全业务线,支持ETL、Hive、Spark、Storm等常见的大数据处理技术组件,每天支持任务运行10万+;具备良好的高容错、易扩展、高并发等特点。


本文对58大数据平台任务调度系统进行了详细介绍,希望对大家有所启发。


背景

58DP作为58大数据平台任务调度系统,是一个通用的大数据分布式调度平台,与58大数据生态深度融合,提供一站式大数据开发与调度服务,是58数据中台战略的具体实践成果之一。目前服务于集团全业务线,支持ETL、Hive、Spark、Storm等常见的大数据处理技术组件,每天支持任务运行10万+。具备良好的高容错、易扩展、高并发等特点。


项目架构

58DP在大数据生态中有着承上启下的作用,是大数据开发的一个核心的基础设施,帮助业务解决大数据开发管理、任务调度、任务监控等复杂问题,项目架构图如下:


Bakery是58DP对外提供的UI界面,用户可以在Bakery系统上完成数据开发、数据调度、元数据管理等操作。

用户可以通过58DP已支持的组件进行数据开发,这些组件包括Spark SQL/Spark Streaming、Hive(MR)、Flink以及大量平台自研的工具,直接对接底层HDFS、MySQL等存储系统。用户会将58DP作业生成的数据对接到自己的业务系统,或者使用数据中台提供的一站式大数据分析平台云窗来实现数据分析与可视化。

Duolong调度API是一套封装了调度系统的简易接口,对用户屏蔽了调度相关的一系列复杂的问题,下面将展开分析58DP调度系统是如何实现各种类型的作业调度,以及如何解决用户在任务开发和历史数据修复相关的一系列业务问题。


技术架构

58DP调度系统主要包括Bakery、Duolong、Signaler、Bakery-cron等子系统。整体集群部署架构如下:

Bakery以主备的方式部署,由nginx作代理实现流量切换,在主节点失效后,会将web请求发送给备用节点处理;Signaler采用的是微服务的框架,由nginx来进行动态流量分发;Bakery-cron是同步服务,单独部署;Duolong比较复杂,我们从高可用,系统资源管理,以及系统的扩展性三个方面来分析。

 Duolong集群是由主备Scheduler调度节点和多个Worker执行节点组成的。Worker节点以Scheduler主节点为核心,组成一个大的调度集群。Worker向Scheduler注册实现直连,并定时向Scheduler汇报心跳信息,心跳信息包括槽位状态、CPU状态、内存状态等节点信息。Scheduler维护Worker的列表,通过负载均衡策略向Worker分发任务。


1. 主从架构设计

Duolong的集群架构如下图:

下面我们分别从Scheduler和Worker的角度来分析Duolong是如何实现高可用的。

1)Scheduler 主备切换

调度节点Scheduler采用主从模式。Scheduler通过向zk获取竞争锁来确定master和slave。slave节点会一直监听master,一旦master失去连接,slave节点马上升级为master。

slave变为master节点的过程是平滑的,这是因为任务在分发到Worker节点之前的每一个阶段都是有状态的。新的master节点会在初始化时根据任务的状态恢复任务,正在调度中的任务会在master节点切换之前的基础上继续被处理。

Worker有失败重连机制,master切换时,worker会重新连到新的master节点上。

2)Worker 自动故障转移

Worker节点之间互相独立,一个节点出现问题不会影响其他节点的使用。调度系统通过两个方面来保证这一点:

  1. 一方面,Scheduler通过心跳机制确认Worker是否可用,如果10分钟之内接收不到Worker心跳,Scheduler会把Worker从自己分发任务的机器列表中删除,从而禁止再向该Worker分发任务;

  2. 另一方面,对于在出问题Worker节点上执行的任务,Scheduler根据策略任务回收,重新分发到其他Worker节点。

Scheduler的回收策略基于两点:

  1. 确认超时,Scheduler向Worker节点分发任务采用ack机制,Scheduler 收不到ack信息或者ack超时,任务被回收重新分发到其他可用的Worker节点。

  2. 执行超时,任务在Worker节点执行太久,任务会被回收并标识任务已失败,同时给用户发告警通知。


2.  槽位资源管理

58DP调度系统通过槽位(slot)来管理调度资源。Worker每执行一个任务需要占用一个槽位(slot),一个槽位实际上是代表一个工作线程。

1)槽位资源模型

在Worker内部维护了一个执行队列,队列的大小根据运行环境的承载能力设置。队列的大小即槽位数。Worker每执行一个任务就使用一个槽位,任务执行完毕槽位就会被回收。因此槽位的多少代表了一个Worker的并行处理能力。

每一个Worker都支持多种任务类型(如Hive-SQL、数据抽取、MySQL脚本、Spark streaming等),Hadoop底层通过多租户实现部门之间的资源管理。系统通过槽位量化Worker的处理能力,并实现任务分发的动态调整和Worker负载均衡。

 另外,通过计算集群的总体槽位使用情况,预估未来业务量的增长情况,即可方便的对集群扩容进行预估,实现Worker节点的横向扩展。

2)负载均衡

Scheduler对任务的分发策略遵循如下公式:

其中freeSlots为Worker的可使用的槽位数,memory为Worker节点的内存使用率。

公式可以描述为:取内存使用率小于90%且有最多可用槽位数的Worker。内存使用率的限制保证了Worker可用,而取最多可用槽位数的Worker保证任务能够均衡的分发到各个节点。


3. 系统扩展性

Duolong的扩展性体现在Worker可以根据系统的负载实现弹性伸缩。当系统资源(槽位数)不足时,可以方便的加入新的Worker。Worker的自动故障转移机制保障Worker节点的增加和减少不会影响系统任务的正常运行。


调度模型设计

调度模型如下:

1. 用户在Bakery用户界面创建任务。

2. Scheduler收到任务后立即提交Worker执行,如果是周期性任务,由Quartz定时触发执行。

3. Worker负责任务创建上下文环境,并根据具体的任务类型选择调用相应的Executor来执行。同时,判断该任务是否存在依赖任务,通过Signaler判断信号状态。

4. Executor负责任务的具体执行,并将结果日志存储Hbase。

5. 日志同步服务负责同步调度执行进度信息到Bakery,实时展示给用户,完成整个任务执行的闭环流。


1. 任务分发

1)任务的生命周期

作业提交后形成一个执行任务,执行任务从用户提交开始,需要经过等待调度,限流排队,等待信号,串行排队,等待槽位5个阶段,任务才能被分发到worker节点执行,下图是任务执行生命周期示意图。

1. 等待调度,即任务在提交后等待系统调度的阶段。

2. 限流排队,任务在手动提交或者定时调度提交时,系统会根据用户资源限制进行限流处理。被限流的任务停留在此队列。

3. 等待信号,如果当前任务的上游信号未生成,则该任务会进入等待信号队列进行排队。

4. 串行排队,串行排队主要是指同一任务在不支持并行的情况下(由业务决定),如果前一个周期未执行完,则需要进入串行排队队列,等待它的前一个周期执行完后才能开始。

5. 等待槽位,在前面的所有检查都通过后,Scheduler会检查当前是否有可用的Worker节点以及空闲的槽位,如果资源充足任务才会被分发到Worker节点进行执行。否则进入等待槽队列排队。

任务进入队列后,会根据一定的优先级顺序处理。

2)任务的执行优先级

58DP根据任务产生的来源不同,将任务分为5大类,对应的任务的优先级规则定义如下:

1. 任务重跑 : X + 时间戳

2. 出错重试 : F + 时间戳

3. 系统调度 : C + 时间戳

4. API提交 : B + 时间戳

5. 手动测试(单次运行): A + 时间戳

优先级的先后顺序是:任务重跑 > 出错重试 > 系统调度 > API提交 > 手动测试。

调度系统在生命周期的每一个阶段都是通过队列轮询的方式来进行任务处理的。在轮询的过程中,任务先出队,如果满足所有的条件检查,就会进入下一个队列,否则会在队列处理完毕后全部重新入队。任务入队的逻辑如下:

首先,不同任务或者不同周期的同一任务,队列中轮询次数多的(等待时间久)的任务优先处理,轮询次数相同,任务优先级高的优先执行;其次,同一任务的相同周期,优先级高的任务优先处理。

下图是一个任务入队的例子,由于任务执行结果对所有下游的是共享的,所以同一个任务同一周期只能有一个实例运行。

58DP默认情况下支持任务的并行执行,任务之间的依赖通过信号系统来保证。


2.  任务依赖

每一个任务执行完,都会生成相应的信号,下游根据上游任务执行信号,来判断是否需要继续执行,信号依赖模型说明如下图。

只有上游所有信号均已生成且成功的情况下,任务才会继续执行,否则会进入等待信号队列等待。这种机制,使得开发者在数据开发的过程中只需要关注上游依赖的任务即可。


3.  任务的失败重试

58DP支持Failover重试机制,任何异常中断的任务都会重新提交,用户也可以配置失败重试次数,避免因底层平台不稳定导致的异常失败,保证尽最大可能实现任务正常结束。


业务问题的解决方案

通过前面的分析,我们已经可以保证例行任务正常运行了,但是业务问题常常是非常复杂的,典型的就是历史数据回跑的场景,我们分两种情况来说明。


1.  重跑按天顺序执行的任务场景

58DP支持任务并发执行,以保证资源最大化利用的前提下尽量降低任务等待和执行时间。但是某些场景下要求回跑历史数据时,作业必须按天顺序执行,例如作业A执行的输出是当天新增用户,统计第二天的新增用户需要以前一天的新增用户作为历史用户基数再次计算。为了解决此类问题,在信号依赖层我们增加了自依赖的功能。

1)自依赖

如上图,一个配置了自依赖功能的作业,执行当天任务时需要依赖上一个周期(即昨天)的信号,通过这种机制,可以实现例行作业和重跑作业统一。实现上也非常简单,在设计依赖信号时,我们就增加了等待周期字段,假如作业A的job_id等于1921,那么它的自依赖配置如第三列所示。

字段说明示例:作业A
job_id当前作业ID1921
parent_id依赖的父作业ID1920
run_cycle依赖周期类型,如daily|hourly等daily
offset依赖的周期偏移量-1

由于调度默认是支持并行和乱序的,通过前面的任务状态图可以发现,如果一个任务依赖的周期在串行排队和限流排队队列,就会导致该任务无限等待。为了解决这个问题58DP引入了弹性队列。

2)弹性队列

如果固定队列已满的情况下,队列中所有任务都在等待信号,很可能所有需要优先执行的任务不在该队列,这时58DP会在串行排队的队列轮训之间验证信号,如果信号准备就绪,并且执行的固定队列已满,就将该任务放到运行队列的弹性队列,立即得到执行,如下图,从而可以解决运行队列全部等待的情况。


2.  需要同时调起上下游的任务场景

任务回跑是离线作业最常见的应用场景了,业务中可能需要以下功能:

1. 支持作业按时间范围回跑

2. 支持作业上下游同时回跑

3. 支持作业上下游部分作业回跑

58DP将一次回跑抽象为一个回跑任务,同一个回跑任务里,回跑任务支持单个作业回跑和多作业回跑,支持按天的时间范围(持按天、周、月周期的时间范围回跑)。如下图所示:

用户需要选择待重跑的任务,并分析已选任务的上下游依赖任务。

点击分析前置任务/依赖任务按钮,即可查看已选任务的前置任和依赖任务拓扑图,选择本次需要重跑的任务,点击提交即可。

目前,任务执行结果信号仅保留2年,超过2年的数据会被归档,归档后的信号默认按成功处理。系统支持不对重跑批次之外的任务检查信号。例如,加入一批任务依赖关系如下图。


当用户需要从任务B开始重跑时,任务A和任务E的信号会自动忽略,而保留重跑批次内的任务依赖关系。


总结与展望

虽然58DP功能已经足够完整,用户覆盖集团全部业务线,但随着业务的快速增长,系统的应用场景更加复杂多变,为了能够快速响应,我们还需要优化集群,采用环形无中心架构,提升集群的稳定性和高可用能力;研发轻量级的执行Worker节点,提供离线任务统一调度的解决方案;增加动态拓扑调度结构,优化作业调度时机;结合智能AI实现更加智能的任务分析和预测,支持资源更加精细的管控能力等等。


作者简介

姚连斌,TEG数据产品研发部,高级数据开发工程师。


END



其他推荐:
一招提速30ms,解密58同镇推荐业务之动态日志级别配置实践
人物|陈兴振:如何加速AI算法研发?58是这样解决的
一招提速30ms,解密58同镇推荐业务之动态日志级别配置实践
安居客 Android App的平台化演进之路

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存